跳到主要内容

Go Channel 的原理

管道 channel

通道(channel)是用来传递数据的一个数据结构。在 Java 中 JMM 是共享内存并发模型,而 Go 则是消息传递并发模型。

  • 消息传递并发模型:线程之间没有公共状态,线程之间必须通过发送消息来进行显示通信。
  • 共享内存并发模型:线程之间共享程序的公共状态,通过写-读内存中的公共状态来进行隐式通信。

Go语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。

通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。

声明一个通道

// 如果不指定容量,默认通道的容量是0,这种通道也成为非缓冲通道。
ch := make(chan int)

通道可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。操作符 <- 用于指定通道的方向,发送或接收。

ch <- value         // 发送 value 到 channel
<- ch // 接收并将其丢弃(可以利用这个来阻塞协程)
x := <- ch // 从 channel 中接收数据,并赋值给 x
x, ok := <- channel // 功能同时,同时检查管道是否已关闭或是否为空

注意:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须有接收端相应的接收数据。

例1:等待全部 goroutine 取得数字和

以下实例通过两个 goroutine 来计算数字之和,在 goroutine 完成计算后,它会计算两个结果的和:

package main

import "fmt"

func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // 把 sum 发送到通道 c
}

func main() {
s := []int{7, 2, 8, -9, 4, 0}

c := make(chan int)
go sum(s[:len(s)/2], c) // 数组切割成两半
go sum(s[len(s)/2:], c)

// 这里会等待两个 goroutine 完成,然后获取各自的返回值
x, y := <-c, <-c // 从通道 c 中接收

fmt.Println(x, y, x+y)
}

输出为:

-5 17 12

例2:消息的唯一性

以下每个消息只会被一个 goroutine 消费

func main() {
channel := make(chan int)

go consumer("goroutine01", channel)
go consumer("goroutine02", channel)
go consumer("goroutine03", channel)

for i := 0; i < 10; i++ {
channel <- i
}

select {
}
}

func consumer(name string, msg <-chan int) {
for v := range msg {
fmt.Println(name, v)
}
}

打印:

goroutine03 0
goroutine03 1
goroutine03 2
goroutine02 5
goroutine02 6
goroutine02 7
goroutine02 8
goroutine02 9
goroutine03 3
goroutine01 4
// ...

Channel 是同步的还是异步的?

在 Go 中,通道(Channel)既可以是同步的也可以是异步的,这取决于通道的使用方式和操作。

通道的同步性指的是发送和接收操作的行为:

  1. 同步通道(Synchronous Channel):当发送操作(channel <- value)和接收操作(value := <-channel)在通道上进行时,它们会阻塞当前的 Goroutine,直到发送或接收都准备就绪。发送操作和接收操作必须互相配对,发送者会等待接收者接收数据,而接收者会等待发送者发送数据。这种同步机制确保了通信的完整性和可靠性。

  2. 异步通道(Asynchronous Channel):在某些情况下,我们希望发送或接收操作不阻塞当前的 Goroutine,以便继续执行后续的代码。对于这种情况,Go 提供了带有缓冲区的通道(Buffered Channel)。通过在创建通道时指定缓冲区的容量,我们可以在发送和接收操作之间创建一个缓冲区,从而实现异步操作。发送操作只有在缓冲区未满时才会立即返回,而接收操作只有在缓冲区非空时才会立即返回。这种异步机制可以提高并发性能和灵活性。

总结而言,通道的同步性取决于发送和接收操作的方式。未缓冲的通道是同步的,而带有缓冲区的通道可以实现异步操作。开发人员可以根据需求选择适当的通道类型,以实现所需的同步或异步行为。

来看看源码

在 Golang 的源代码中,channel 相关的源码主要在 src/runtime/chan.go 文件中。

channel 的结构(创建)

chan.go 文件定义了 channel 相关的结构体和函数

  • hchan 结构体(表示 channel)
  • makechan 函数(创建 channel)
  • chansend 函数(发送数据到 channel)
  • chanrecv 函数(从 channel 接收数据)
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写入时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,chan不允许并发读写
}

// 这个 waitq 等待队列
type waitq struct {
first *sudog
last *sudog
}

chan 内部实现了一个环形队列作为其缓冲区,队列的长度是创建 chan 时指定的。

c := make(chan int, 6)

下图展示了一个可缓存6个元素的 channel 示意图:

20230427161552

dataqsiz 指示了队列长度为 6,即可缓存 6 个元素; buf 指向队列的内存,队列中还剩余两个元素; qcount 表示队列中还有两个元素; sendx 指示后续写入的数据存储的位置,取值 [0, 6); recvx 指示从该位置读取数据, 取值 [0, 6)

channel 的两个队列

上面的 waitq 是一个等待队列,用于存放等待读或写的 goroutine。

recvq    waitq          // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列

写的 goroutine 会根据是否还有缓存空间来决定是否阻塞,如果没有缓存空间,写 goroutine 会阻塞,然后被丢到 sendq 等待队列中,等待读 goroutine 读取数据后再唤醒它; 同理如果没有数据,读 goroutine 会阻塞,然后被丢到 recvq 等待队列中,等待写 goroutine 写入数据后再唤醒它。

channel 关闭后

这里直接看源码:

// src/runtime/chan.go:358

func closechan(c *hchan) {
// 可以发现,如果 chan 是 nil 或者已经关闭,都会 panic。
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

// ...


var glist gList

// release all readers
for {
sg := c.recvq.dequeue()
// ...
glist.push(gp)
}

// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
// ...
glist.push(gp)
}

//
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

当一个 channel 关闭时,运行时系统会遍历所有阻塞在该 channel 上的 goroutine,将它们从阻塞队列中移除,并将它们加入到 glist 中,然后将这个 channel 的状态设置为已关闭。接着,所有阻塞在该 channel 上的 goroutine 都会被唤醒,并返回特殊的值。如果有多个 goroutine 阻塞在该 channel 上,它们将按照一定的策略(如 FIFO)依次被唤醒。

将所有阻塞在 channel 上的 goroutine 放到 glist 中的原因是,这些 goroutine 已经不能再被 channel 阻塞,需要重新调度,重新分配 CPU 时间片。由于这些 goroutine 都已经被唤醒,如果它们没有及时被重新调度,可能会占用大量的系统资源,导致系统的性能下降。

总之,当一个 channel 关闭时,会将所有阻塞在该 channel 上的 goroutine 移动到全局运行队列中,并返回特殊值,以便这些 goroutine 可以及时被重新调度和执行。